package com.atguigu.day08;

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.IOException;

public class Flink01_StatBackend {
    public static void main(String[] args) throws IOException {
        //1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO 状态后端的设置

        //TODO 内存级别
        //老版本写法
        env.setStateBackend(new MemoryStateBackend());

        //新版本写法
        env.setStateBackend(new HashMapStateBackend());
        //声明Checkpoint存储位置
        env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());


        //TODO 文件级别
        //老版本写法
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/ck"));

        //新版本写法
        env.setStateBackend(new HashMapStateBackend());
        //声明Checkpoint位置
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck");


        //TODO RocksDB
        //老版写法
        env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop102:8020/ck/rocksDB..."));

        //新版写法
        env.setStateBackend(new EmbeddedRocksDBStateBackend());
        //声明Checkpoint位置
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck/rocksDB...");

        //TODO 不对齐的barrier
        env.getCheckpointConfig().enableUnalignedCheckpoints();
    }
}
